import org.apache.spark.sql.SparkSession
val spark: SparkSession = SparkSession.builder
.appName("My Spark Application") // optional and will be autogenerated if not specified
.master("local[*]") // avoid hardcoding the deployment environment
.enableHiveSupport() // self-explanatory, isn't it?
.config("spark.sql.warehouse.dir", "target/spark-warehouse")
.getOrCreate
SparkSession — The Entry Point to Spark SQL
SparkSession is the entry point to Spark SQL. It is the very first object you have to create to start developing Spark SQL applications using the fully-typed Dataset (and untyped DataFrame) data abstractions.
|
Note
|
SparkSession has merged SQLContext and HiveContext in one object as of Spark 2.0.0.
|
You use the SparkSession.builder method to create an instance of SparkSession.
And stop the current SparkSession using stop method.
spark.stop
You can have multiple SparkSessions in a single Spark application.
Internally, SparkSession requires a SparkContext and an optional SharedState (that represents the shared state across SparkSession instances).
| Method | Description |
|---|---|
"Opens" a builder to get or create a |
|
Returns the current version of Spark. |
|
Use |
|
Creates an empty |
|
Creates a |
|
Executes a SQL query (and returns a |
|
Access to user-defined functions (UDFs). |
|
Creates a |
|
Access to the catalog of the entities of structured queries |
|
Access to |
|
Access to the current runtime configuration. |
|
Access to |
|
Access to |
|
Creates a new |
|
Stops the |
|
Tip
|
Use spark.sql.warehouse.dir Spark property to change the location of Hive’s Refer to SharedState in this document to learn about (the low-level details of) Spark SQL support for Apache Hive. See also the official Hive Metastore Administration document. |
Creating SparkSession Using Builder — builder method
builder(): Builder
builder creates a new Builder that you use to build a fully-configured SparkSession using a fluent API.
import org.apache.spark.sql.SparkSession
val builder = SparkSession.builder
|
Tip
|
Read about Fluent interface design pattern in Wikipedia, the free encyclopedia. |
Accessing Version of Spark — version Method
version: String
version returns the version of Apache Spark in use.
Internally, version uses spark.SPARK_VERSION value that is the version property in spark-version-info.properties properties file on CLASSPATH.
Implicit Conversions — implicits object
The implicits object is a helper class with the Scala implicit methods (aka conversions) to convert Scala objects to Datasets, DataFrames and Columns. It also defines Encoders for Scala’s "primitive" types, e.g. Int, Double, String, and their products and collections.
|
Note
|
Import the implicits by
|
implicits object offers support for creating Dataset from RDD of any type (for which an encoder exists in scope), or case classes or tuples, and Seq.
implicits object also offers conversions from Scala’s Symbol or $ to Column.
It also offers conversions from RDD or Seq of Product types (e.g. case classes or tuples) to DataFrame. It has direct conversions from RDD of Int, Long and String to DataFrame with a single column name _1.
|
Note
|
It is only possible to call toDF methods on RDD objects of Int, Long, and String "primitive" types.
|
Creating Empty Dataset — emptyDataset method
emptyDataset[T: Encoder]: Dataset[T]
emptyDataset creates an empty Dataset (assuming that future records being of type T).
scala> val strings = spark.emptyDataset[String]
strings: org.apache.spark.sql.Dataset[String] = [value: string]
scala> strings.printSchema
root
|-- value: string (nullable = true)
emptyDataset creates a LocalRelation logical query plan.
Creating Dataset from Local Collections and RDDs — createDataset methods
createDataset[T : Encoder](data: Seq[T]): Dataset[T]
createDataset[T : Encoder](data: RDD[T]): Dataset[T]
createDataset is an experimental API to create a Dataset from a local Scala collection, i.e. Seq[T], Java’s List[T], or a distributed RDD[T].
scala> val one = spark.createDataset(Seq(1))
one: org.apache.spark.sql.Dataset[Int] = [value: int]
scala> one.show
+-----+
|value|
+-----+
| 1|
+-----+
createDataset creates a LocalRelation logical query plan (for the input data collection) or LogicalRDD (for the input RDD[T]).
|
Tip
|
You’d be better off using Scala implicits and
|
Internally, createDataset first looks up the implicit expression encoder in scope to access the AttributeReferences (of the schema).
|
Note
|
Only unresolved expression encoders are currently supported. |
The expression encoder is then used to map elements (of the input Seq[T]) into a collection of InternalRows. With the references and rows, createDataset returns a Dataset with a LocalRelation logical query plan.
Creating Dataset With Single Long Column — range methods
range(end: Long): Dataset[java.lang.Long]
range(start: Long, end: Long): Dataset[java.lang.Long]
range(start: Long, end: Long, step: Long): Dataset[java.lang.Long]
range(start: Long, end: Long, step: Long, numPartitions: Int): Dataset[java.lang.Long]
range family of methods create a Dataset of Long numbers.
scala> spark.range(start = 0, end = 4, step = 2, numPartitions = 5).show
+---+
| id|
+---+
| 0|
| 2|
+---+
|
Note
|
The three first variants (that do not specify numPartitions explicitly) use SparkContext.defaultParallelism for the number of partitions numPartitions.
|
Internally, range creates a new Dataset[Long] with Range logical plan and Encoders.LONG encoder.
Creating Empty DataFrame — emptyDataFrame method
emptyDataFrame: DataFrame
emptyDataFrame creates an empty DataFrame (with no rows and columns).
It calls createDataFrame with an empty RDD[Row] and an empty schema StructType(Nil).
Creating DataFrames from RDDs with Explicit Schema — createDataFrame method
createDataFrame(rowRDD: RDD[Row], schema: StructType): DataFrame
createDataFrame creates a DataFrame using RDD[Row] and the input schema. It is assumed that the rows in rowRDD all match the schema.
Executing SQL Queries — sql method
sql(sqlText: String): DataFrame
sql executes the sqlText SQL statement.
scala> sql("SHOW TABLES")
res0: org.apache.spark.sql.DataFrame = [tableName: string, isTemporary: boolean]
scala> sql("DROP TABLE IF EXISTS testData")
res1: org.apache.spark.sql.DataFrame = []
// Let's create a table to SHOW it
spark.range(10).write.option("path", "/tmp/test").saveAsTable("testData")
scala> sql("SHOW TABLES").show
+---------+-----------+
|tableName|isTemporary|
+---------+-----------+
| testdata| false|
+---------+-----------+
Internally, it creates a Dataset using the current SparkSession and a logical plan. The plan is created by parsing the input sqlText using sessionState.sqlParser.
|
Caution
|
FIXME See Executing SQL Queries. |
Accessing UDF Registration Interface — udf Attribute
udf: UDFRegistration
udf attribute gives access to UDFRegistration that allows registering user-defined functions for SQL-based query expressions.
val spark: SparkSession = ...
spark.udf.register("myUpper", (s: String) => s.toUpperCase)
val strs = ('a' to 'c').map(_.toString).toDS
strs.registerTempTable("strs")
scala> sql("SELECT *, myUpper(value) UPPER FROM strs").show
+-----+-----+
|value|UPPER|
+-----+-----+
| a| A|
| b| B|
| c| C|
+-----+-----+
Internally, it is an alias for SessionState.udf.
Creating DataFrames from Tables — table method
table(tableName: String): DataFrame
table creates a DataFrame from records in the tableName table (if exists).
val df = spark.table("mytable")
Accessing Metastore — catalog Attribute
catalog: Catalog
catalog attribute is a (lazy) interface to the current metastore, i.e. data catalog (of relational entities like databases, tables, functions, table columns, and temporary views).
|
Tip
|
All methods in Catalog return Datasets.
|
scala> spark.catalog.listTables.show
+------------------+--------+-----------+---------+-----------+
| name|database|description|tableType|isTemporary|
+------------------+--------+-----------+---------+-----------+
|my_permanent_table| default| null| MANAGED| false|
| strs| null| null|TEMPORARY| true|
+------------------+--------+-----------+---------+-----------+
Internally, catalog creates a CatalogImpl (referencing the current SparkSession).
Accessing DataFrameReader — read method
read: DataFrameReader
read method returns a DataFrameReader that is used to read data from external storage systems and load it into a DataFrame.
val spark: SparkSession = // create instance
val dfReader: DataFrameReader = spark.read
Runtime Configuration — conf attribute
conf: RuntimeConfig
conf returns the current runtime configuration (as RuntimeConfig) that wraps SQLConf.
|
Caution
|
FIXME |
sessionState Property
sessionState is a transient lazy value that represents the current SessionState.
|
Note
|
sessionState is a private[sql] value so you can only access it in a code inside org.apache.spark.sql package.
|
sessionState is a lazily-created value based on the internal spark.sql.catalogImplementation setting that can be:
-
org.apache.spark.sql.hive.HiveSessionStateforhive -
org.apache.spark.sql.internal.SessionStateforin-memory
streams Attribute
streams: StreamingQueryManager
streams attribute gives access to StreamingQueryManager (through SessionState).
val spark: SparkSession = ...
spark.streams.active.foreach(println)
experimentalMethods Attribute
experimental: ExperimentalMethods
experimentalMethods is an extension point with ExperimentalMethods that is a per-session collection of extra strategies and Rule[LogicalPlan]s.
|
Note
|
experimental is used in SparkPlanner and SparkOptimizer. Hive and Structured Streaming use it for their own extra strategies and optimization rules.
|
newSession method
newSession(): SparkSession
newSession creates (starts) a new SparkSession (with the current SparkContext and SharedState).
scala> println(sc.version)
2.0.0-SNAPSHOT
scala> val newSession = spark.newSession
newSession: org.apache.spark.sql.SparkSession = org.apache.spark.sql.SparkSession@122f58a
sharedState Attribute
sharedState is the current SharedState. It is created lazily when first accessed.
SharedState
SharedState is an internal class that holds the shared state across active SQL sessions (as SparkSession instances) by sharing CacheManager, SQLListener, and ExternalCatalog.
|
Tip
|
Enable Add the following line to
Refer to Logging. |
SharedState requires a SparkContext when created. It also adds hive-site.xml to Hadoop’s Configuration in the current SparkContext if found on CLASSPATH.
|
Note
|
hive-site.xml is an optional Hive configuration file when working with Hive in Spark.
|
The fully-qualified class name is org.apache.spark.sql.internal.SharedState.
SharedState is created lazily, i.e. when first accessed after SparkSession is created. It can happen when a new session is created or when the shared services are accessed. It is created with a SparkContext.
When created, SharedState sets hive.metastore.warehouse.dir to spark.sql.warehouse.dir if hive.metastore.warehouse.dir is not set or spark.sql.warehouse.dir is set. Otherwise, when hive.metastore.warehouse.dir is set and spark.sql.warehouse.dir is not, spark.sql.warehouse.dir gets set to hive.metastore.warehouse.dir. You should see the following INFO message in the logs:
INFO spark.sql.warehouse.dir is not set, but hive.metastore.warehouse.dir is set. Setting spark.sql.warehouse.dir to the value of hive.metastore.warehouse.dir ('[hiveWarehouseDir]').
You should see the following INFO message in the logs:
INFO SharedState: Warehouse path is '[warehousePath]'.
Stopping SparkSession — stop Method
stop(): Unit
stop stops the SparkSession, i.e. stops the underlying SparkContext.
Creating SparkSession Instance
|
Caution
|
FIXME |
baseRelationToDataFrame Method
|
Caution
|
FIXME |